package org.snake.nebulae.core.common;

import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.ConcurrentHashMap;

@Component
@Slf4j
public class DefaultMqttFactory implements MqttFactory {

    @Value("${mqtt.server}")
    private String mqttServer;

    @Value("${mqtt.username}")
    private String mqttUsername;

    @Value("${mqtt.password}")
    private String mqttPassword;

    @Resource
    private ThreadPoolTaskScheduler asyncPool;

    private ConcurrentHashMap<Long, MqttAsyncClient> mqttAsyncClients = new ConcurrentHashMap<>();

    @Override
    public MqttConnectOptions getCustomMqttConnectionOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();

        mqttConnectOptions.setUserName(mqttUsername);
        mqttConnectOptions.setPassword(mqttPassword.toCharArray());
        mqttConnectOptions.setCleanSession(true);

        return mqttConnectOptions;
    }

    @Override
    public MqttAsyncClient getCustomMqttAsyncClient() throws MqttException {
        String clientId = IdUtil.simpleUUID();

        MqttClientPersistence memoryPersistence = new MemoryPersistence();
        MqttPingSender scheduledExecutorPingSender = new ScheduledExecutorPingSender(asyncPool.getScheduledThreadPoolExecutor());

        MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(mqttServer, clientId, memoryPersistence, scheduledExecutorPingSender, asyncPool.getScheduledThreadPoolExecutor());
        mqttAsyncClient.connect(getCustomMqttConnectionOptions()).waitForCompletion();

        mqttAsyncClients.put(System.currentTimeMillis(), mqttAsyncClient);

        return mqttAsyncClient;
    }

    @Override
    public MqttAsyncClient choseOneClient() {
        if (mqttAsyncClients.size() > 0) {
            return mqttAsyncClients.elements().nextElement();
        }

        try {
            return getCustomMqttAsyncClient();
        } catch (MqttException e) {
            log.error("获取mqttclient出错: {}", e.getMessage());
        }

        return null;
    }
}
